// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! A transaction can have multiple operations on state. For example, it might update values
//! for a few existing keys. Imagine that we have the following tree.
//!
//! ```text
//!                 root0
//!                 /    \
//!                /      \
//!  key1 => value11        key2 => value21
//! ```
//!
//! The next transaction updates `key1`'s value to `value12` and `key2`'s value to `value22`.
//! Let's assume we update key2 first. Then the tree becomes:
//!
//! ```text
//!                   (on disk)              (in memory)
//!                     root0                  root1'
//!                    /     \                /     \
//!                   /   ___ \ _____________/       \
//!                  /  _/     \                      \
//!                 / _/        \                      \
//!                / /           \                      \
//!   key1 => value11           key2 => value21       key2 => value22
//!      (on disk)                 (on disk)            (in memory)
//! ```
//!
//! Note that
//!   1) we created a new version of the tree with `root1'` and the new `key2` node generated;
//!   2) both `root1'` and the new `key2` node are still held in memory within a batch of nodes
//!      that will be written into db atomically.
//!
//! Next, we need to update `key1`'s value. This time we are dealing with the tree starting from
//! the new root. Part of the tree is in memory and the rest of it is in database. We'll update the
//! left child and the new root. We should
//!   1) create a new version for `key1` child.
//!   2) update `root1'` directly instead of making another version.
//! The resulting tree should look like:
//!
//! ```text
//!                   (on disk)                                     (in memory)
//!                     root0                                         root1''
//!                    /     \                                       /     \
//!                   /       \                                     /       \
//!                  /         \                                   /         \
//!                 /           \                                 /           \
//!                /             \                               /             \
//!   key1 => value11             key2 => value21  key1 => value12              key2 => value22
//!      (on disk)                   (on disk)       (in memory)                  (in memory)
//! ```
//!
//! This means that we need to be able to tell whether to create a new version of a node or to
//! update an existing node by deleting it and creating a new node directly. `TreeCache` provides
//! APIs to cache intermediate nodes and blobs in memory and simplify the actual tree
//! implementation.
//!
//! If we are dealing with a single-version tree, any complex tree operation can be seen as a
//! collection of the following operations:
//!   - Put a new node/blob.
//!   - Delete a node/blob.
//! When we apply these operations on a multi-version tree:
//!   1) Put a new node.
//!   2) When we remove a node, if the node is in the previous on-disk version, we don't need to do
//!      anything. Otherwise we delete it from the tree cache.
//! Updating node could be operated as deletion of the node followed by insertion of the updated
//! node.

#[cfg(test)]
mod tree_cache_test;

use crate::{node_type::Node, TreeReader, TreeUpdateBatch};
use crypto::{hash::SPARSE_MERKLE_PLACEHOLDER_HASH, HashValue};
use failure::prelude::*;
use std::{
    collections::{hash_map::Entry, HashMap},
    convert::Into,
};
use types::account_state_blob::AccountStateBlob;

/// `FrozenTreeCache` is used as a field of `TreeCache` storing all the nodes and blobs that are
/// are generated by earlier transactions so they have to be immutable. The motivation of
/// `FrozenTreeCache` is to let `TreeCache` freeze intermediate results from each transaction to
/// help commit more than one transaction in a row atomically.
#[derive(Default)]
struct FrozenTreeCache {
    /// Immutable node_cache.
    node_cache: HashMap<HashValue, Node>,

    /// Immutable blob_cache.
    blob_cache: HashMap<HashValue, AccountStateBlob>,

    /// Frozen root hashes after each earlier transaction.
    root_hashes: Vec<HashValue>,
}

/// `TreeCache` is a in-memory cache for per-transaction updates of sparse Merkle nodes and value
/// blobs.
pub struct TreeCache<'a, R: 'a + TreeReader> {
    /// Current root node hash in cache.
    root_hash: HashValue,

    /// Intermediate nodes keyed by node hash
    node_cache: HashMap<HashValue, Node>,

    /// Intermediate value blobs keyed by blob hash. it is reasonable to assume the blob data
    /// associated with distinct keys could be the same so the value of each entry is a tuple,
    /// (blob, blob_counter). The first time a blob is put in, `blob_counter = 1`; After that,
    /// corresponding `blob_counter` will increment by 1 each time the same blob is put in
    /// again. Deletion follows a similar rule, the blob entry will be removed from
    /// `blob_cache` only when `blob_counter == 1`; otherwise, only `blob_counter` will
    /// decrement by 1.
    blob_cache: HashMap<HashValue, (AccountStateBlob, usize)>,

    /// The immutable part of this cache
    frozen_cache: FrozenTreeCache,

    /// The underlying persistent storage.
    reader: &'a R,
}

impl<'a, R> TreeReader for TreeCache<'a, R>
where
    R: 'a + TreeReader,
{
    /// Gets a node with given hash. If it doesn't exist in node cache, read from `reader`.
    fn get_node(&self, node_hash: HashValue) -> Result<Node> {
        Ok(if let Some(node) = self.node_cache.get(&node_hash) {
            node.clone()
        } else if let Some(node) = self.frozen_cache.node_cache.get(&node_hash) {
            node.clone()
        } else {
            self.reader.get_node(node_hash)?
        })
    }

    /// Gets the blob with given hash. If it doesn't exist in blob cache, read from `reader`.
    fn get_blob(&self, blob_hash: HashValue) -> Result<AccountStateBlob> {
        Ok(if let Some((blob, _)) = self.blob_cache.get(&blob_hash) {
            blob.clone()
        } else if let Some(blob) = self.frozen_cache.blob_cache.get(&blob_hash) {
            blob.clone()
        } else {
            self.reader.get_blob(blob_hash)?
        })
    }
}

impl<'a, R> TreeCache<'a, R>
where
    R: 'a + TreeReader,
{
    /// Constructs a new `TreeCache` instance.
    pub fn new(reader: &'a R, root_hash: HashValue) -> Self {
        Self {
            node_cache: HashMap::default(),
            blob_cache: HashMap::default(),
            frozen_cache: FrozenTreeCache::default(),
            root_hash,
            reader,
        }
    }

    /// Get root node.
    pub fn get_root_node(&self) -> Result<Option<Node>> {
        Ok(match self.root_hash {
            root_hash if root_hash != *SPARSE_MERKLE_PLACEHOLDER_HASH => {
                Some(self.get_node(root_hash)?)
            }
            _ => None,
        })
    }

    /// Set root node hash.
    pub fn set_root_hash(&mut self, root_hash: HashValue) {
        self.root_hash = root_hash;
    }

    /// Put the node with given hash as key into node_cache.
    pub fn put_node(&mut self, node_hash: HashValue, new_node: Node) -> Result<()> {
        if let Entry::Vacant(o) = self.node_cache.entry(node_hash) {
            o.insert(new_node);
        } else {
            bail!("Node with key {:?} already exists in NodeBatch", node_hash);
        }
        Ok(())
    }

    /// Delete a node with given hash.
    pub fn delete_node(&mut self, old_node_hash: HashValue) {
        // If node cache doesn't have this node, it means the node is in the previous version of
        // the tree on the disk. Then the code below has no effect.
        self.node_cache.remove(&old_node_hash);
    }

    /// Put a blob with given hash as key into cache. We allow duplicate blobs.
    pub fn put_blob(&mut self, blob_hash: HashValue, new_blob: AccountStateBlob) -> Result<()> {
        self.blob_cache
            .entry(blob_hash)
            .and_modify(|(_blob, cnt)| *cnt += 1)
            .or_insert((new_blob, 1));
        Ok(())
    }

    /// Delete the blob with given hash.
    pub fn delete_blob(&mut self, old_blob_hash: HashValue) {
        // If cache doesn't have this blob, it means the blob is in the previous version of
        // the tree on the disk. Then the code below has no effect.
        if let Entry::Occupied(mut blob) = self.blob_cache.entry(old_blob_hash) {
            // We delete the entry from the map.
            if blob.get().1 > 1 {
                blob.get_mut().1 -= 1;
            } else {
                blob.remove_entry();
            }
        }
    }

    /// Freeze all the contents in cache to be immutable and clear both `node_cache` and
    /// `blob_cache`.
    pub fn freeze(&mut self) {
        self.frozen_cache.root_hashes.push(self.root_hash);
        self.frozen_cache.node_cache.extend(self.node_cache.drain());
        // throw away counter
        self.frozen_cache
            .blob_cache
            .extend(self.blob_cache.drain().map(|(k, v)| (k, v.0)));
    }
}

impl<'a, R> Into<(Vec<HashValue>, TreeUpdateBatch)> for TreeCache<'a, R>
where
    R: 'a + TreeReader,
{
    fn into(self) -> (Vec<HashValue>, TreeUpdateBatch) {
        (
            self.frozen_cache.root_hashes,
            TreeUpdateBatch {
                node_batch: self.frozen_cache.node_cache,
                blob_batch: self.frozen_cache.blob_cache,
            },
        )
    }
}
